“You are at once both the quiet and the confusion of my heart.”
― Franz Kafka
可以取得叢集中代理的資訊,通常跟訊息、事件處理無關,這些資訊是用來監測跟做維護動作
await admin.describeCluster()
// {
// brokers: [
// { nodeId: 0, host: 'localhost', port: 9092 }
// ],
// controller: 0,
// clusterId: 'f8QmWTB8SQSLE6C99G4qzA'
// }
取得指定資源的設定資訊
await admin.describeConfigs({
includeSynonyms: <boolean>,
resources: <ResourceConfigQuery[]>
})
ResourceConfigQuery 的結構如下
{
type: <ConfigResourceType>,
name: <String>,
configNames: <String[]>
}
回傳指定資源的所有設定檔
const { ConfigResourceTypes } = require('kafkajs')
await admin.describeConfigs({
includeSynonyms: false,
resources: [
{
type: ConfigResourceTypes.TOPIC,
name: 'topic-name'
}
]
})
回傳指定資源的指定設定檔內容
const { ConfigResourceTypes } = require('kafkajs')
await admin.describeConfigs({
includeSynonyms: false,
resources: [
{
type: ConfigResourceTypes.TOPIC,
name: 'topic-name',
configNames: ['cleanup.policy']
}
]
})
資源類型對照表如下:
UNKNOWN: 0,
TOPIC: 2,
BROKER: 4,
BROKER_LOGGER: 8,
回傳範例
{
resources: [
{
configEntries: [{
configName: 'cleanup.policy',
configValue: 'delete',
isDefault: true,
configSource: 5,
isSensitive: false,
readOnly: false
}],
errorCode: 0,
errorMessage: null,
resourceName: 'topic-name',
resourceType: 2
}
],
throttleTime: 0
}
alterConfigs 可以修改指定資源的相關設定
await admin.alterConfigs({
validateOnly: false,
resources: <ResourceConfig[]>
})
ResourceConfig 的結構如下
{
type: <ConfigResourceType>,
name: <String>,
configEntries: <ResourceConfigEntry[]>
}
ResourceConfigEntry 的結構如下
{
name: <String>,
value: <String>
}
範例
const { ConfigResourceTypes } = require('kafkajs')
await admin.alterConfigs({
resources: [{
type: ConfigResourceTypes.TOPIC,
name: 'topic-name',
configEntries: [{ name: 'cleanup.policy', value: 'compact' }]
}]
})
回傳範例
{
resources: [{
errorCode: 0,
errorMessage: null,
resourceName: 'topic-name',
resourceType: 2,
}],
throttleTime: 0,
}
取得代理中有效的消費者群組清單
await admin.listGroups()
回傳範例
{
groups: [
{groupId: 'testgroup', protocolType: 'consumer'}
]
}
用消費者群組ID去取得消費者群組的詳細資料,使用方式跟 consumer.describeGroup() (https://kafka.js.org/docs/next/consuming#describe-group) 類似,但是允許你取得多個消費者群組的資料
await admin.describeGroups([ 'testgroup' ])
// {
// groups: [{
// errorCode: 0,
// groupId: 'testgroup',
// members: [
// {
// clientHost: '/172.19.0.1',
// clientId: 'test-3e93246fe1f4efa7380a',
// memberAssignment: Buffer,
// memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb',
// memberMetadata: Buffer,
// },
// ],
// protocol: 'RoundRobinAssigner',
// protocolType: 'consumer',
// state: 'Stable',
// }]
// `
可以使用 AssignerProtocol 的解碼方法去拿到 memeberMetadata 和 memberAssignment 的資料
範例如下
const memberMetadata = AssignerProtocol.MemberMetadata.decode(memberMetadata)
const memberAssignment = AssignerProtocol.MemberAssignment.decode(memberAssignment)
用消費者群組ID去刪除消費者群組
註記:只能夠刪除沒有跟任何消費者有連線的消費者群組
await admin.deleteGroups([groupId])
範例:
await admin.deleteGroups(['group-test'])
範例回傳
[
{groupId: 'testgroup', errorCode: 'consumer'}
]
因為此方法可以一次刪除多個消費者群組,其中一個或多個群組可能會刪除失敗,這些資訊會在報錯訊息中
try {
await admin.deleteGroups(['a', 'b', 'c'])
} catch (error) {
// error.name 'KafkaJSDeleteGroupsError'
// error.groups = [{
// groupId: a
// error: KafkaJSProtocolError
// }]
}